/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.service.engine;

import com.chinamobile.cmss.lakehouse.common.dto.ResourceCheckResult;
import com.chinamobile.cmss.lakehouse.common.enums.TaskType;
import com.chinamobile.cmss.lakehouse.common.kubernetes.ComputeResource;
import com.chinamobile.cmss.lakehouse.common.kubernetes.ResourceSizeConfig;
import com.chinamobile.cmss.lakehouse.common.utils.ClusterUtil;
import com.chinamobile.cmss.lakehouse.common.utils.KeyValue;
import com.chinamobile.cmss.lakehouse.core.queue.QueueResourceService;
import com.chinamobile.cmss.lakehouse.dao.entity.LakehouseClusterInfoEntity;
import com.chinamobile.cmss.lakehouse.service.redis.RedisOperateClient;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class ResourceManagerHandler {

    @Autowired
    private RedisOperateClient redisClient;

    @Autowired
    private ResourceSizeConfig resourceConfig;

    @Autowired
    private QueueResourceService queueResourceService;

    public KeyValue<Integer, KeyValue<Integer, ComputeResource>> allocate(LakehouseClusterInfoEntity cluster,
                                                                          TaskType type) throws Exception {
        // get all running task
        long running = redisClient.getRunningTask(cluster.getInstance());
        if (TaskType.JDBC.equals(type)) {
            // jdbc was +1 to occupy the task before
            running--;
        }
        if (running < cluster.getTaskParallelize()) {
            // Check if has enough resource
            KeyValue<Integer, KeyValue<Integer, ComputeResource>> instanceAndDriver =
                getInstanceNumAndDriverResource(cluster);
            int instanceNum = instanceAndDriver.key();
            if (instanceNum < 1) {
                throw new Exception("Executor number must be positive!");
            }
            int executorCpus = instanceAndDriver.value().key();
            ComputeResource driverResource = instanceAndDriver.value().value();
            if (driverResource.getCpu() < 1) {
                throw new Exception("Driver cpu must be positive!");
            }
            int requestCpu = driverResource.getCpu() + instanceNum * executorCpus;
            int requestMem = driverResource.getMemory()
                + instanceNum * executorCpus * ResourceSizeConfig.resourceScale;
            ResourceCheckResult result = queueResourceService.runnable(
                ClusterUtil.convertCluster2Namespace(cluster.getInstance()), requestCpu, requestMem);
            if (result.isResult()) {
                return KeyValue.of(instanceNum, KeyValue.of(executorCpus, driverResource));
            }
            log.warn("Yunikorn returns: can not run!");
        } else {
            log.warn("Running task:{} more than cluster parallelize: {}", running, cluster.getTaskParallelize());
        }
        return KeyValue.of(0, KeyValue.of(0, ComputeResource.builder().cpu(0).memory(0).build()));
    }

    protected KeyValue<Integer, KeyValue<Integer, ComputeResource>> getInstanceNumAndDriverResource(
        LakehouseClusterInfoEntity clusterInfo) {
        // get parallelize
        int parallelize = clusterInfo.getTaskParallelize();
        int scale = ResourceSizeConfig.resourceScale;

        ComputeResource computeResource =
            ResourceSizeConfig.getResource(clusterInfo.getComputeResourceSize());
        int requestCpu = computeResource.getCpu() / parallelize;
        int requestMem = computeResource.getMemory() / parallelize;
        assert requestCpu > 1 && requestMem > scale :
            String.format("parallelize(%d) is too large, no enough resource to compute", parallelize);
        // drive min 1c 4g, max 4c 16g
        // driver will request 1/5 resource
        int driverCpu = Math.min(4, Math.max(1, requestCpu / 5));
        int driverMem = driverCpu * scale;

        int remainingCpu = requestCpu - driverCpu;
        int remainingMem = requestMem - driverMem;

        assert remainingCpu > 0 && remainingMem > 0 : "Resource must be positive.";

        // default set cpu 4 for one executor, at least 1
        int defaultExecutorCpu = getDefaultExecutorCpu(remainingCpu);
        int instanceNum = Math.max(1, Math.min(remainingCpu / defaultExecutorCpu,
            remainingMem / (defaultExecutorCpu * ResourceSizeConfig.resourceScale)));
        driverCpu = requestCpu - instanceNum * defaultExecutorCpu;
        driverMem = requestMem - instanceNum * defaultExecutorCpu * ResourceSizeConfig.resourceScale;
        return KeyValue.of(instanceNum, KeyValue.of(defaultExecutorCpu,
            ComputeResource.builder().memory(driverMem).cpu(driverCpu).build()));
    }

    private int getDefaultExecutorCpu(int remainingCpu) {
        if (remainingCpu > 20) {
            return 5;
        } else {
            return Math.min(remainingCpu, 4);
        }
    }
}
